package com.atuguigu.flink.Day05;

import com.atuguigu.flink.Day01.Singlesensor.SensorSource;
import com.atuguigu.flink.sensor.SendsorReading;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import sun.management.Sensor;

import java.sql.Timestamp;

public class Example7 {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);
        env.enableCheckpointing(10 * 1000L);
        env.setStateBackend(new FsStateBackend("file///:D:\\ideaproject\\maven\\FlinkBigData1021\\src\\main\\resources\\checkpoints"));

        env
                .addSource(new SensorSource())
                .filter(r->r.id.equals("sensor_1"))
                .keyBy(r->r.id)
                .process(new KeyedProcessFunction<String, SendsorReading, String>() {
                    private ListState<SendsorReading> listState;
                    private ValueState<Long> timeTs;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        super.open(parameters);
                        listState=getRuntimeContext().getListState(new ListStateDescriptor<SendsorReading>("sensor", Types.POJO(SendsorReading.class)));
                        timeTs=getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer",Types.LONG));
                    }

                    @Override
                    public void processElement(SendsorReading value, Context ctx, Collector<String> out) throws Exception {
                        listState.add(value);
                        if(timeTs.value() == null){
                            ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 10 * 1000L);
                            timeTs.update(ctx.timerService().currentProcessingTime() + 10 * 1000L);
                        }

                    }

                    @Override
                    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
                        super.onTimer(timestamp, ctx, out);
                        long count = listState.get().spliterator().getExactSizeIfKnown();
                        out.collect("共" + count +"条数据");
                        timeTs.clear();
                    }
                })
                .print();
    }
}
